/**
 * 
 */
package fetcher;
/**
*@author tonychao 
*功能 从Spark 2.l.0 版本的历史服务器API中获得数据，并保存在数据库中
*当USE_DB_Flag=false 时，不使用数据库
*/

import java.io.IOException;
import java.sql.SQLException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import mysql.DB;
import others.Global;


import org.json.*;


public class APIFetcher {
	String masterURL;
	DB dbConnection;
	
	//设置SparkAPIServer的路径
	public APIFetcher(){
		this.masterURL=Global.CURRENT_SPARK_HISTORY_SERVER;	//服务器REST API网络端口
	}
	
	//与DB相关的操作
	public void open() throws ClassNotFoundException, SQLException{
		this.dbConnection=new DB();
		this.dbConnection.initConnection();
	}
	
	public void close() throws SQLException{
		if (this.dbConnection!=null) {
			this.dbConnection.close();
		}
	}
	
	String app_name;
	String app_id;
	long app_time;
	
	/**
	 * 如果重复读取一个Run record 会有插入问题
	 * @param app_id
	 * @return
	 * @throws JSONException
	 * @throws SQLException
	 * @throws ParseException 
	 * @throws InterruptedException 
	 * @throws IOException 
	 */
	public RunRecord readRunRecord(String app_id) throws JSONException,SQLException,  ParseException, InterruptedException, IOException{
		HTTPWebFetcher httpFetcher= new HTTPWebFetcher();
		APIOutPut("历史信息根目录位于："+this.masterURL+"api/v1/applications/"+app_id);
		String jString=httpFetcher.getData(this.masterURL+"api/v1/applications/"+app_id);
		RunRecord record= new RunRecord();
		JSONObject datajson = new JSONObject(jString);
		record.setApplicationName(datajson.getString("name"));
		record.setApplicationID(datajson.getString("id"));
		JSONArray attempts = datajson.getJSONArray("attempts");
		//默认最后一个attempt是成功的
		int index= attempts.length()-1;
		record.setApplicationStartTime(Long.parseLong(attempts.getJSONObject(index).getString("startTimeEpoch")));
		record.setApplicationEndTime(Long.parseLong(attempts.getJSONObject(index).getString("endTimeEpoch")));
		//Stage info
		record.stageList=getStageList(app_id);
		//conf
		record.conf=getConf(app_id);
		

		//System.out.println(jString);

		return record;
	}
	
	
	
	
	

	
	
	/**
	 * get对应应用类型的id
	 * @return
	 * @throws JSONException
	 */
	/*
	 * /applications	A list of all applications. 
	 * query
			?status=[completed|running] list only applications in the chosen state. 
			?minDate=[date] earliest date/time to list. 
			Examples: 
			?minDate=2015-02-10 
			?minDate=2015-02-03T16:42:40.000GMT 
			?maxDate=[date] latest date/time to list; uses same format as minDate. 
			?limit=[limit] limits the number of applications listed.
	 * */
	
	public  ArrayList<String> getApplications(String query) throws JSONException{
		ArrayList<String> idList= new ArrayList<String>();
		
		HTTPWebFetcher HttpFetcher= new HTTPWebFetcher();
		String jString=HttpFetcher.getData(this.masterURL+"api/v1/applications"+query);
		JSONArray dataJson = new JSONArray(jString);
		//System.out.println((dataJson).getJSONObject(2));
		for (int i =0;i<dataJson.length();i++){
			String app_id= dataJson.getJSONObject(i).getString("id");
			idList.add(app_id);
		}
		//for (String id : idList) System.out.println(id);
		return idList;
	}
	
	/**
	 * 存在这样的情况，即API中 /applications?status=completed 查询出的任务却在实际上没有执行完成等
	 * 表现为查询jobs 返回的是一个空值，这个函数将选择出这样的任务并返回false
	 * 获得一个已经完成任务的状态，ture=成功结束
	 * false= 中途被干掉了
	 * @param app_id
	 * @return
	 */
	public boolean getCompleteApplicationStatus(String app_id){
		HTTPWebFetcher HttpFetcher= new HTTPWebFetcher();

		try {
			String jString=HttpFetcher.getData(this.masterURL+"api/v1/applications/"+app_id+"/jobs");
			JSONArray dataJson = new JSONArray(jString);
			//如果job list 是空的话，显然死了
			if (dataJson.length()<=0) return false;
			//如果有一个Job没有成功的化也死了
			for (int i =0;i<dataJson.length();i++){
				JSONObject jsonjob= dataJson.getJSONObject(i);
				String jobStatus= jsonjob.getString("status");
				if (!jobStatus.equals("SUCCEEDED")) return false;
			}
		} catch (JSONException e) {
			APIOutPut("此日志文件对应的应用没有成功运行,但是在原因核实之前不能删除记录");
			//e.printStackTrace();
			//return false;
			// TODO Auto-generated catch block
		}catch (Exception e) {
			//e.printStackTrace();
			// TODO: handle exception
		}
		return true;
		
	}
	
	
	/**
	 * 按照app-ID查找一个应用所有的Stage
	 * @param app_id2
	 * @return
	 * @throws JSONException 
	 * @throws ParseException 
	 */
	private ArrayList<StageInfo> getStageList(String app_id) throws JSONException, ParseException {
		ArrayList<StageInfo> ret= new ArrayList<>();
		HTTPWebFetcher apiFetcher= new HTTPWebFetcher();
		String jString=apiFetcher.getData(this.masterURL+"api/v1/applications/"+app_id+"/stages");
		JSONArray datajson= new JSONArray(jString);
		for (int i =0;i<datajson.length();i++){
			StageInfo stage= getStageInfo(datajson.getJSONObject(i));
			ret.add(stage);
		}
		
		return ret;
	}
	
	/**
	 * 强行从WEB页面解析配置
	 */
	public SparkConfiguration getConf(String app_id){
		HTTPWebFetcher apiFetcher= new HTTPWebFetcher();
		//http://localhost:18080/history/local-1500885399624/environment/ 由于版本的原因，真是醉人啊
		String htmlStr= apiFetcher.getData(this.masterURL+"history/"+app_id+"/environment/");
		//所有配置都是默认值
		
		htmlStr=htmlStr.substring(htmlStr.indexOf(">Spark Properties"),htmlStr.indexOf("System Properties"));
		//System.out.println(htmlStr);
		SparkConfiguration configuration =new SparkConfiguration();
		configuration.findConfInHTML(htmlStr);
		configuration.printChosenConf();
		return configuration;
	}
	

	/**
	 * 从一个json对象中解析一个Stage的信息
	 * @param jsonObject
	 * @return
	 * @throws JSONException 
	 * @throws ParseException 
	 */
	private StageInfo getStageInfo(JSONObject jsonObject) throws JSONException, ParseException {
		StageInfo stage= new StageInfo();
		stage.setStageName(jsonObject.getString("name"));
		String tmp =jsonObject.getString("submissionTime");
		//其实可以直接根据Simple time 格式修改 SimpleDateFormat
		tmp=tmp.substring(0,tmp.length()-3);
		tmp = tmp.replace('T', ' ');
		long submissionTime=dateToStamp(tmp);		
		stage.setSubmissionTime(submissionTime);
		
		tmp = jsonObject.getString("completionTime");
		tmp =tmp.substring(0,tmp.length()-3).replace('T', ' ');
		long completionTime= dateToStamp(tmp);
		stage.setCompletionTime(completionTime);
		stage.setInputBytes_APIONLY(jsonObject.getLong("inputBytes"));
		return stage;
	}
	
	/* 
     * 将时间转换为时间戳
     */    
    public static long dateToStamp(String s) throws ParseException{
        String res;
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        Date date = simpleDateFormat.parse(s);
        long ts = date.getTime();
        return ts;
    }
    
    /* 
     * 将时间戳转换为时间
     */
    public static String stampToDate(long lt){
        String res;
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        Date date = new Date(lt);
        res = simpleDateFormat.format(date);
        return res;
    }
    
    
    
    public static void APIOutPut(String line){
    	System.out.println("[fetcher 模块]:\t"+line);
    }
	




	
}
